---
title: Execute on Celery | Dagster
description: The dagster-celery executor uses Celery to execute Dagster solids.
---

# Execute on Celery

[Celery](http://www.celeryproject.org/) is a longstanding open-source
Python distributed task queue system, with support for a variety of
queues (brokers) and result persistence strategies (backends).

The `dagster-celery` executor uses Celery to satisfy three typical
requirements when running pipelines in production:

1. Parallel execution capacity that scales horizontally across multiple
   compute nodes.
2. Separate queues to isolate execution and control external resource
   usage at the solid level.
3. Priority-based execution at the solid level.

The dagster-celery executor compiles a pipeline definition and its
associated configuration into a concrete execution plan, and then
submits each execution step to the broker as a separate Celery task. The
dagster-celery workers then pick up tasks from the queues to which they
are subscribed, according to the priorities assigned to each task, and
execute the steps to which the tasks correspond.

## Quick start

Let's construct a very parallel toy pipeline and then execute it using
the Celery executor.

```python file=/deploying/celery_pipeline.py
from dagster import ModeDefinition, default_executors, fs_io_manager, pipeline, solid
from dagster_celery import celery_executor

celery_mode_defs = [
    ModeDefinition(
        resource_defs={"io_manager": fs_io_manager},
        executor_defs=default_executors + [celery_executor],
    )
]


@solid
def not_much(_):
    return


@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
    for i in range(50):
        not_much.alias("not_much_" + str(i))()
```

Save this pipeline as `celery_pipeline.py`.

To run the Celery executor, you'll need a running broker like
[RabbitMQ](https://www.rabbitmq.com/). With Docker, this is as simple
as:

```shell
docker run -p 5672:5672 rabbitmq:3.8.2
```

You will also need to run a Celery worker to execute tasks. From the
same directory in which you saved the pipeline file (we'll explain why
in a moment), run:

```shell
dagster-celery worker start -A dagster_celery.app
```

Now you can execute this pipeline with Celery. Again, from the same
directory in which you saved the pipeline file, run:

```shell
dagit -f celery_pipeline.py
```

Now you can execute the parallel pipeline from Dagit with the following
config:

```yaml file=/deploying/celery.yaml
execution:
  celery:
```

## Ensuring workers are in sync

In the quick start, we cheated in a couple of ways:

- By running a single Celery worker on the same node as Dagit, so that
  local ephemeral run storage and event log storage could be shared
  between them, and so that filesystem intermediates storage would be
  sufficient to exchange values between the worker's task executions.
- By running the Celery worker in the same directory to which we saved
  `celery_pipeline.py`, so that our Dagster code would be available
  both to Dagit and to the worker: i.e., the worker and Dagit can both
  find the pipeline definition in the same file
  (`-f celery_pipeline.py`).

In production, more configuration is required.

### 1. Persistent run and event log storage

First, ensure that appropriate persistent run and event log storage,
e.g., **`PostgresRunStorage`** and **`PostgresEventLogStorage`**,
are configured on your Dagster instance, so that Dagit
and the workers can communicate information about the run and events.
You can do this by adding a block such as the following to your
`dagster.yaml` (by default, Dagster will look for this file at
`$DAGSTER_HOME/dagster.yaml` when the `DAGSTER_HOME` environment
variable is set):

```python file=/deploying/dagster-pg.yaml
run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { database }
      port: { port }

event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { db_name }
      port: { port }

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { db_name }
      port: { port }
```

**The same instance config must be present in Dagit's environment and
in the workers' environments.** If you haven't already, please read
about [the Dagster instance](/deployment/dagster-instance).

### 2. Persistent system storage

Second, because data is passed between solids potentially running in
different worker processes, which may be on different nodes, you must
use storage that is accessible from all nodes running Celery workers for
the pipeline runs you execute with the Celery executor -- for example,
an S3 or GCS bucket, or an NFS mount. An appropriate system storage --
such as **`s3_system_storage`** or **`gcs_system_storage`** -- must be available on a **`ModeDefinition`** attached
to the pipeline.

### 3. Executor & worker config

Third, if you are using custom config for your pipeline runs -- for
instance, using a different Celery broker url or backend -- you must
ensure that your workers start up with this config. Make sure your
engine config is present in a YAML file accessible to the workers and
start them with the `-y` parameter as follows:

```shell
dagster-celery worker start -y /path/to/celery_config.yaml
```

### 4. Dagster code

Finally, you will need to make sure that the Dagster code you want the
workers to execute is present in their environment, and that it is in
sync with the code present on the node running Dagit. The easiest way to
do this is typically to package your code into a Python module, and for
the `workspace.yaml` you use to run Dagit to load from that module.

In the quick start, we accomplished this by starting Dagit with the `-f`
and `-n` parameters, which told it which file to find our pipeline in
and under what name, and then made sure that we started our Celery
worker from the same point in the filesystem -- so that the pipeline
was available at the same place.

## CLI

In the quick start above, we started our workers using the
`dagster-celery` CLI, rather than by invoking Celery directly. This CLI
is intended as a convenience wrapper for the common case that shields
users from the full complexity of Celery configuration. Of course, it's
still possible to start Celery workers directly -- but please let us
know if you find yourself going down this path.

Run `dagster-celery worker start` to start new workers,
`dagster-celery worker list` to view running workers, and
`dagster-celery worker terminate` to terminate workers. For all of these
commands, it's essential that you have your broker running.

If you are running your celery cluster with custom config (e.g., the
broker URL or backend), you should also pass a path to a YAML file
containing that config to the `-y` parameter of all these invocations.
Otherwise, workers will not start up with the config you are expecting,
and the CLI may not be able to find running workers to list or terminate
them.

Although this system is deliberately designed to make the full range of
Celery config available as needed, keep in mind that Celery exposes many
knobs, many combinations of which are not compatible with each other.
However, workloads also differ widely. If you are running many
pipelines with very long-running or very short-running tasks, for
instance, and you are already comfortable tuning Celery, you might find
that changing some of the settings works better for your case.

## Monitoring and debugging

There are several available tools for monitoring and debugging your
queues and workers. First, of course, is Dagit, which will display event
logs and the stdout/stderr from your pipeline executions. You can also
observe the logs generated by your broker and by the worker processes.

To debug broker/queue level issues, you should use the monitoring tools
provided by the broker you're running. RabbitMQ includes a [monitoring
API](https://www.rabbitmq.com/monitoring.html) and has first class
support for Prometheus & Grafana integration in production.

Celery also includes the excellent
[Flower](https://flower.readthedocs.io/en/latest/) tool for monitoring
your Celery workers and queues. This can be very useful in understanding
how workers interact with the queue.

## Broker and backend

At the moment, dagster-celery is tested using the RabbitMQ broker and
the default RPC backend. At least one team is running dagster-celery
using SQS as the broker, and we intend to expand the scope of
broker/backend matrix testing, probably starting with the Postgres
backend.
